package com.jhf.youke.pulsar.domain.service;

import lombok.extern.log4j.Log4j2;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
* @Description:
* @Param:
* @return:
* @Author: RHJ
* @Date: 2022/11/18
*/
@Component
@Log4j2
public class ConsumeService {

    @Resource
    private ProducerClient producerClient;

    public void consume(String topic)throws Exception{
        producerClient.init();
        Consumer<String> consume = producerClient.getPulsarClient().newConsumer(Schema.STRING)
                //可以设置多个topic
                .topic(topic)
                .subscriptionName("test1")
                //设置共享模式
                .subscriptionType(SubscriptionType.Shared)
                .subscribe();
        while (true){
            Message<String> receive = consume.receive();
            try {
                String msg = receive.getValue();
                log.info("msg {}", msg);
                consume.acknowledge(receive);

            }catch (Exception e){
                consume.negativeAcknowledge(receive);
            }
        }

    }

}
